[FLINK-39437][table] Support interruptible timers in PTFs#27962
[FLINK-39437][table] Support interruptible timers in PTFs#27962fhueske wants to merge 3 commits intoapache:masterfrom
Conversation
807ebc4 to
2c38910
Compare
There was a problem hiding this comment.
Pull request overview
Enables interruptible timer processing for Process Table Function (PTF) operators and adjusts watermark handling so timer callbacks observe a consistent watermark when timers are fired incrementally via the mailbox.
Changes:
- Enable interruptible timers for PTF operators by overriding
useInterruptibleTimers(...)inAbstractProcessTableOperator. - Update
processWatermark(...)ordering to ingest the new watermark into the PTF runner before delegating to the superclass (which advances the timer service and may fire timers). - Add a new planner semantic test program and test function intended to validate watermark consistency across multiple same-timestamp named timers.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java | Enables interruptible timers for PTF operators and reorders watermark ingestion vs. timer firing. |
| flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java | Adds a new PTF test function that registers multiple same-timestamp named timers. |
| flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java | Adds a new TableTestProgram with expected outputs for consistent watermark observation in timer callbacks. |
| flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java | Registers the new semantic test program in the PTF semantic test suite. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| @Override | ||
| public final boolean useInterruptibleTimers(ReadableConfig config) { |
There was a problem hiding this comment.
[nit] final could be dropped on useInterruptibleTimers? For consistency, all seven analogous overrides (WindowOperator, TimeIntervalJoin, BaseTemporalSortOperator, CepOperator, and others) have the similar function.
There was a problem hiding this comment.
I added the final on purpose to prevent this function from being overwritten by extending classes.
AbstractProcessTableOperator is an internal class. If it becomes necessary for a child class to overwrite the the method, we can always remove the final.
Until then, it documents that all child classes must be able to handle interruptible timers.
c12100e to
82c8ca6
Compare
| .build()) | ||
| .runSql( | ||
| "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") | ||
| .build(); |
There was a problem hiding this comment.
Perhaps we need to create a dedicated HarnessTest to test whether interruptions are still correct
There was a problem hiding this comment.
I've added ProcessSetTableOperatorInterruptibleTimersTest to assert the call order or timers.
Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to return `true`, activating the `MailboxWatermarkProcessor` for PTF operators. This allows timer firing to be interrupted between mailbox iterations, improving throughput by not blocking mailbox processing during large timer batches. Also reorder `processWatermark()` to call `ingestCurrentWatermarkEvent()` before `super.processWatermark()` to keep the runner's watermark consistent with the timer service watermark, which is advanced before any timer fires. Add a test (`PROCESS_CONSISTENT_WATERMARK_TIMERS`) that validates multiple named timers registered at the same timestamp all see a consistent watermark in their callbacks.
…nterruptible timers
3efca16 to
869e474
Compare
| // TODO this line has issues with interruptible timers, see FLINK-39437 | ||
| // Update the runner's watermark before firing timers to keep it consistent with the | ||
| // timer service watermark, which is also advanced before any timer fires. | ||
| processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp()); |
There was a problem hiding this comment.
I don't think that this line swap is sufficient. The docs in MailboxWatermarkProcessor state that there might be a chance that new rows enter the PTF while the current watermark has not fully been processed. We should call ingestCurrentWatermarkEvent only after the watermark has been fully processed. Otherwise a PTF that implements late data handling manually and relies on TimeContext.currentWatermark would not behave correctly. If you check MailboxPartialWatermarkProcessor it only updates the current watermark upon completion of processing.
There was a problem hiding this comment.
Before processing any timers, MailboxPartialWatermarkProcessor, MailboxWatermarkProcessor and InternalTimerServiceImpl update their internal watermarks to the new watermark. So the operators internal state is already at the new watermark before all timers have been processed. Only the WM emission is delayed until all timers are handled.
I think this also makes sense, also in the context of late data handling, because the operator has already received the WM (all following records with a ts < wm are de-facto late) and started to operate on it. The first triggered timers might already cleanup state and/or emit data. So even before all timers have been processed, data can (and IMO should) be treated as late.
Imagine a case with two timers for key A and B. A has been fired and cleaned up A's state. Then there's an interrupt with a late record for key A. A's data is gone so the record cannot be processed correctly. Receiving a late record for B might be OK because the data is still present, but it is also late.
Flipping the calls here just makes it more obvious that the internal WM state is updated before the timers are called. Even with interrupted timers, the ingestCurrentWatermarkEvent() call would happen before an intermediate records is processed.
What is the purpose of the change
Enable interruptible timers for PTFs.
Brief change log
Override
useInterruptibleTimers()inAbstractProcessTableOperatorto returntrue, activating theMailboxWatermarkProcessorfor PTF operators. This allows timer firing to be interrupted between mailbox iterations, improving throughput by not blocking mailbox processing during large timer batches.Also reorder
processWatermark()to callingestWatermarkEvent()beforesuper.processWatermark(), ensuring all timer callbacks (including those deferred across mailbox iterations) see a consistent watermark in the runner. This matches the behavior ofWritableInternalTimeContext.currentWatermark(), which reads from the timer service and already sees the new watermark before any timer fires.Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation